-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhance documentation #61
enhance documentation #61
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @chulucninh09 it looks great. added few minor questions/comments
debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example
Show resolved
Hide resolved
This connector only packages with support for `hadoop` catalog. | ||
|
||
## No automatic schema evolution | ||
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you had a chance to test it? i think schema change events are not processed and should not cause any error. schema change events are saved to file if its configured with debezium.source.database.history
and debezium.source.database.history.file.filename
.
I believe we could remove this section:
Schema change events can make the connector throw error. To workaround this, turn off schema change event in source setting.
and link this page to give more details about current schema change behavior
Schema Change Behaviour : https://github.com/memiiso/debezium-server-iceberg/blob/master/docs/DOCS.md#schema-change-behaviour
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I found when using this with SQL Server, you can check the following log
2021-11-28 14:38:51,416 INFO [io.deb.ser.ice.tab.IcebergTableOperatorUpsert] (pool-7-thread-1) Committed 2047 events to table! s3a://test-iceberg/iceberg_warehouse8/debeziumevents/debeziumcdc_tutorial_dbo_person
2021-11-28 14:38:51,538 WARN [io.deb.ser.ice.IcebergChangeConsumer] (pool-7-thread-1) Table not found: debeziumevents.debeziumcdc_tutorial
2021-11-28 14:38:51,539 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) {"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"int64","optional":true,"field":"event_serial_no"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":true,"field":"databaseName"},{"type":"string","optional":true,"field":"schemaName"},{"type":"string","optional":true,"field":"ddl"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"field":"id"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"defaultCharsetName"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"primaryKeyColumnNames"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"jdbcType"},{"type":"int32","optional":true,"field":"nativeType"},{"type":"string","optional":false,"field":"typeName"},{"type":"string","optional":true,"field":"typeExpression"},{"type":"string","optional":true,"field":"charsetName"},{"type":"int32","optional":true,"field":"length"},{"type":"int32","optional":true,"field":"scale"},{"type":"int32","optional":false,"field":"position"},{"type":"boolean","optional":true,"field":"optional"},{"type":"boolean","optional":true,"field":"autoIncremented"},{"type":"boolean","optional":true,"field":"generated"}],"optional":false,"name":"io.debezium.connector.schema.Column"},"optional":false,"field":"columns"}],"optional":false,"name":"io.debezium.connector.schema.Table","field":"table"}],"optional":false,"name":"io.debezium.connector.schema.Change"},"optional":false,"field":"tableChanges"}],"optional":false,"name":"io.debezium.connector.sqlserver.SchemaChangeValue"}
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Converting Schema of: ::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [1] .source::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Converting Schema of: source::struct
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [2] source.version::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [3] source.connector::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [4] source.name::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [5] source.ts_ms::int64
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [6] source.snapshot::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [7] source.db::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [8] source.sequence::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [9] source.schema::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [10] source.table::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [11] source.change_lsn::string
2021-11-28 14:38:51,540 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [12] source.commit_lsn::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [13] source.event_serial_no::int64
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [14] .databaseName::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [15] .schemaName::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [16] .ddl::string
2021-11-28 14:38:51,541 DEBUG [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Processing Field: [17] .tableChanges::array
2021-11-28 14:38:51,541 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) Stopping down connector
2021-11-28 14:40:21,542 WARN [io.deb.pip.ChangeEventSourceCoordinator] (pool-7-thread-1) Coordinator didn't stop in the expected time, shutting down executor now
2021-11-28 14:40:24,324 WARN [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Snapshot was interrupted before completion
2021-11-28 14:40:24,325 INFO [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Snapshot - Final stage
2021-11-28 14:40:24,325 INFO [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Removing locking timeout
2021-11-28 14:40:24,327 WARN [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Change event source executor was interrupted: java.lang.InterruptedException
at java.base/java.lang.Object.wait(Native Method)
at io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
at io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:446)
at io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:176)
at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:89)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:49)
at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:165)
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:386)
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:315)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:135)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2021-11-28 14:40:24,328 INFO [io.deb.pip.met.StreamingChangeEventSourceMetrics] (debezium-sqlserverconnector-tutorial-change-event-source-coordinator) Connected metrics set to 'false'
2021-11-28 14:40:24,329 INFO [io.deb.jdb.JdbcConnection] (pool-14-thread-1) Connection gracefully closed
2021-11-28 14:40:24,330 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-7-thread-1) Stopped FileOffsetBackingStore
2021-11-28 14:40:24,331 INFO [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: 'tableChanges' has Array type, Array type not supported!', error = '{}': java.lang.RuntimeException: 'tableChanges' has Array type, Array type not supported!
at io.debezium.server.iceberg.IcebergUtil.getIcebergSchema(IcebergUtil.java:74)
at io.debezium.server.iceberg.IcebergUtil.getIcebergSchema(IcebergUtil.java:35)
at io.debezium.server.iceberg.IcebergUtil.getIcebergFieldsFromEventSchema(IcebergUtil.java:199)
at io.debezium.server.iceberg.IcebergChangeConsumer.createIcebergTable(IcebergChangeConsumer.java:199)
at io.debezium.server.iceberg.IcebergChangeConsumer.lambda$handleBatch$2(IcebergChangeConsumer.java:159)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:159)
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:821)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:145)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2021-11-28 14:40:24,348 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2021-11-28 14:40:24,348 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
2021-11-28 14:40:24,367 INFO [io.quarkus] (main) debezium-server-iceberg-dist stopped in 0.035s
This is the config when I didn't turn off schema change capture for SQL Server
# sql server source
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=52.221.232.4
debezium.source.database.port=1433
debezium.source.database.user=debezium
debezium.source.database.password=debezium
debezium.source.database.dbname=dms_sample
debezium.source.database.server.name=tutorial
debezium.source.table.include.list=dbo.person
# mandate for sql server source, avoid error when snapshot and schema change
#debezium.source.include.schema.changes=false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like consumer is trying to create table debeziumevents.debeziumcdc_tutorial
to store schema changes. but its failing because field 'tableChanges' is in Array type. Currently iceberg consumer is not supporting Array data type that's why exception thrown.
not sure what is the best way to explain it. maybe something like
Schema change topic
has unsupported data type Array, its recommended to disable it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like even with array data type support its not possible to save Schema change topic
. seems like tableChanges
has kind of special type. it makes sense to recommend disable it for all connectors.
its failing with
Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` from Array value (token `JsonToken.START_ARRAY`)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I also checked iceberg document when I found that error and decided to avoid that by turn off schema change event
docs/CAVEATS.md
Outdated
By default, debezium connector will publish all snapshot of the tables in the database, that leads to unnessesary iceberg table snapshot of all tables. Unless you want to replicate all table from the database into iceberg table, set `debezium.source.table.include.list` to specific tables that you want to replicate. By this way, you avoid replicate too many table that you don't really want to. | ||
|
||
## AWS S3 credentials | ||
You should inject environment variables `AWS_ACCESS_KEY` and `AWS_SECRET_ACCESS_KEY` to write to S3 or setup proper `HADOOP_HOME` env then add s3a configuration into `core-site.xml`, more information can be found [here](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_with_S3). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe its is possible to use application.properties
to configure AWS S3
all the settings starting with debezium.sink.iceberg.<my.config>
are passed to iceberg and from there used by iceberg
example from unit test
https://github.com/memiiso/debezium-server-iceberg/blob/master/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java#L131
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, saw that too, by default, I suggest turn off debezium.sink.iceberg.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
to provide easier way to inject aws credential
|
||
## AWS S3 credentials | ||
You can setup aws credentials in the following ways: | ||
- Option 1: use `debezium.sink.iceberg.fs.s3a.access.key` and `debezium.sink.iceberg.fs.s3a.secret.key` in `application.properties` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we highlight that is possible to pass any iceberg configuration using debezium.sink.iceberg.<my.iceberg.config>=xyz
format? .it could be useful to know there are many iceberg configs and its possible to set them with application.properties
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the detailed docs mentioned it already. You can check it here https://github.com/chulucninh09/debezium-server-iceberg/blob/document-enhancement/docs/DOCS.md#configuring-iceberg
This connector only packages with support for `hadoop` catalog. | ||
|
||
## No automatic schema evolution | ||
Currently, there is no handler to detect schema changes and auto evolve the schema. Schema change events can make the connector throw error. To workaround this, turn off schema change event in `source` setting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like consumer is trying to create table debeziumevents.debeziumcdc_tutorial
to store schema changes. but its failing because field 'tableChanges' is in Array type. Currently iceberg consumer is not supporting Array data type that's why exception thrown.
not sure what is the best way to explain it. maybe something like
Schema change topic
has unsupported data type Array, its recommended to disable it
merged, Thank you @chulucninh09 |
No description provided.